package ins.framework.kafkastream;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;

public class WordCountLambdaExample {
    static int i=0;
    public static final void add(){
        i++;
        System.out.print(i);
    }

    public static void main(final String[] args) throws Exception {
      //  final String bootstrapServers = args.length > 0 ? args[0] : "192.168.56.128:9092";
        final String bootstrapServers = args.length > 0 ? args[0] : "10.10.1.7:9092";
        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamtopic3p_group_out4");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "streamtopic3p-client");
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // Specify default (de)serializers for record keys and for record values.
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,"E:\\");
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);

        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);


        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();


        final StreamsBuilder builder = new StreamsBuilder();


        final KStream<String, String> textLines = builder.stream("streamtopic3p");
        // 不是字母和数字的
        final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

                //将一条记录分成多条记录
                textLines
                .flatMapValues(value -> {
                    add();
                    System.out.println(":"+value);
                    //用不是字母和字符split,实现一行分多行
                    return Arrays.asList(pattern.split(value.toLowerCase()));
                })
                        // Write the `KTable<String, Long>` to the output topic. 对数据加工后，再回写到另一个topic中
                .to("streamtopic3p-output", Produced.with(stringSerde, stringSerde));
//                .count();


        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

//        streams.cleanUp();
        streams.start();

        // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}
